/*
 * Copyright (C) 2025 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates.utils;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.cloud.teleport.v2.spanner.ddl.Column;
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
import com.google.cloud.teleport.v2.spanner.ddl.IndexColumn;
import com.google.cloud.teleport.v2.spanner.ddl.Table;
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceColumn;
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceDatabaseType;
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceSchema;
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceTable;
import com.google.cloud.teleport.v2.spanner.type.Type;
import com.google.cloud.teleport.v2.templates.constants.Constants;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * The SchemaUtils class provides helper functions to build Spanner DDL from a session file. A
 * session file is a JSON file that captures the schema mapping from source to Spanner, generated by
 * HarbourBridge.
 */
public final class SchemaUtils {

  // Prevent instantiation of this utility class.
  private SchemaUtils() {}

  // Defines constants for keys used in the session file JSON structure.
  private static final String KEY_SP_SCHEMA = "SpSchema";
  private static final String KEY_SRC_SCHEMA = "SrcSchema";
  private static final String KEY_NAME = "Name";
  private static final String KEY_COL_DEFS = "ColDefs";
  private static final String KEY_TYPE_SPANNER = "T";
  private static final String KEY_TYPE_SOURCE = "Type";
  private static final String KEY_IS_ARRAY = "IsArray";
  private static final String KEY_PRIMARY_KEYS = "PrimaryKeys";
  private static final String KEY_COL_ID = "ColId";
  private static final String KEY_SCHEMA = "Schema";
  private static final String KEY_LEN = "Len";
  private static final String KEY_NOT_NULL = "NotNull";

  // Defines constants for Spanner data types found in the session file.
  private static final String SPANNER_TYPE_STRING = "STRING";
  private static final String SPANNER_TYPE_INT64 = "INT64";
  private static final String SPANNER_TYPE_FLOAT32 = "FLOAT32";
  private static final String SPANNER_TYPE_FLOAT64 = "FLOAT64";
  private static final String SPANNER_TYPE_BOOL = "BOOL";
  private static final String SPANNER_TYPE_BYTES = "BYTES";
  private static final String SPANNER_TYPE_TIMESTAMP = "TIMESTAMP";
  private static final String SPANNER_TYPE_DATE = "DATE";
  private static final String SPANNER_TYPE_NUMERIC = "NUMERIC";
  private static final String SPANNER_TYPE_JSON = "JSON";

  /**
   * Builds a Spanner {@link Ddl} object from a HarbourBridge session file.
   *
   * @param sessionFile Path to the session file.
   * @return A Spanner {@link Ddl} object representing the schema.
   */
  @SuppressWarnings("unchecked")
  public static Ddl buildSpannerDdlFromSessionFile(String sessionFile) {
    try {
      ObjectMapper mapper = new ObjectMapper();
      Map<String, Object> session = mapper.readValue(new File(sessionFile), Map.class);
      Map<String, Object> spSchema = (Map<String, Object>) session.get(KEY_SP_SCHEMA);
      Ddl.Builder ddlBuilder = Ddl.builder();

      for (Map.Entry<String, Object> entry : spSchema.entrySet()) {
        Map<String, Object> tableMap = (Map<String, Object>) entry.getValue();
        String tableName = (String) tableMap.get(KEY_NAME);
        Table.Builder tableBuilder = ddlBuilder.createTable(tableName);
        Map<String, Object> colDefs = (Map<String, Object>) tableMap.get(KEY_COL_DEFS);

        for (String colId : colDefs.keySet()) {
          Map<String, Object> colMap = (Map<String, Object>) colDefs.get(colId);
          String colName = (String) colMap.get(KEY_NAME);
          Map<String, Object> typeMap = (Map<String, Object>) colMap.get(KEY_TYPE_SPANNER);
          String typeName = (String) typeMap.get(KEY_NAME);
          Boolean isArray = (Boolean) typeMap.get(KEY_IS_ARRAY);

          switch (typeName) {
            case SPANNER_TYPE_STRING:
              if (Boolean.TRUE.equals(isArray)) {
                tableBuilder.column(colName).array(Type.string()).endColumn();
              } else {
                tableBuilder.column(colName).string().max().endColumn();
              }
              break;
            case SPANNER_TYPE_INT64:
              tableBuilder.column(colName).int64().endColumn();
              break;
            case SPANNER_TYPE_FLOAT32:
              tableBuilder.column(colName).float32().endColumn();
              break;
            case SPANNER_TYPE_FLOAT64:
              tableBuilder.column(colName).float64().endColumn();
              break;
            case SPANNER_TYPE_BOOL:
              tableBuilder.column(colName).bool().endColumn();
              break;
            case SPANNER_TYPE_BYTES:
              tableBuilder.column(colName).bytes().max().endColumn();
              break;
            case SPANNER_TYPE_TIMESTAMP:
              tableBuilder.column(colName).timestamp().endColumn();
              break;
            case SPANNER_TYPE_DATE:
              tableBuilder.column(colName).date().endColumn();
              break;
            case SPANNER_TYPE_NUMERIC:
              tableBuilder.column(colName).numeric().endColumn();
              break;
            case SPANNER_TYPE_JSON:
              tableBuilder.column(colName).json().endColumn();
              break;
            default:
              throw new IllegalArgumentException(
                  "Unsupported Spanner type in session file: " + typeName);
          }
        }

        List<Map<String, Object>> pks = (List<Map<String, Object>>) tableMap.get(KEY_PRIMARY_KEYS);
        if (pks != null && !pks.isEmpty()) {
          IndexColumn.IndexColumnsBuilder<Table.Builder> pkBuilder = tableBuilder.primaryKey();
          for (Map<String, Object> pk : pks) {
            String colId = (String) pk.get(KEY_COL_ID);
            Map<String, Object> colMap = (Map<String, Object>) colDefs.get(colId);
            String colName = (String) colMap.get(KEY_NAME);
            pkBuilder.asc(colName);
          }
          pkBuilder.end();
        }
        tableBuilder.endTable();
      }
      return ddlBuilder.build();
    } catch (Exception e) {
      throw new RuntimeException("Error reading or parsing session file: " + sessionFile, e);
    }
  }

  /**
   * Builds a {@link SourceSchema} object from a HarbourBridge session file.
   *
   * @param sessionFile Path to the session file.
   * @return A {@link SourceSchema} object representing the source database schema.
   */
  @SuppressWarnings("unchecked")
  public static SourceSchema buildSourceSchemaFromSessionFile(String sessionFile) {
    try {
      ObjectMapper mapper = new ObjectMapper();
      Map<String, Object> session = mapper.readValue(new File(sessionFile), Map.class);

      // TODO: Read database type and name from session file.
      SourceDatabaseType dbType = SourceDatabaseType.MYSQL;
      String dbName = "test-db";
      Map<String, Object> srcSchema = (Map<String, Object>) session.get(KEY_SRC_SCHEMA);
      SourceSchema.Builder schemaBuilder = SourceSchema.builder(dbType).databaseName(dbName);
      ImmutableMap.Builder<String, SourceTable> tablesBuilder = ImmutableMap.builder();

      if (srcSchema != null) {
        for (Map.Entry<String, Object> entry : srcSchema.entrySet()) {
          Map<String, Object> tableMap = (Map<String, Object>) entry.getValue();
          SourceTable.Builder tableBuilder =
              SourceTable.builder(dbType)
                  .name((String) tableMap.get(KEY_NAME))
                  .schema((String) tableMap.get(KEY_SCHEMA));
          Map<String, Object> colDefs = (Map<String, Object>) tableMap.get(KEY_COL_DEFS);
          ImmutableList.Builder<SourceColumn> columnsBuilder = ImmutableList.builder();
          if (colDefs != null) {
            for (String colId : colDefs.keySet()) {
              Map<String, Object> colMap = (Map<String, Object>) colDefs.get(colId);
              Map<String, Object> typeMap = (Map<String, Object>) colMap.get(KEY_TYPE_SOURCE);
              String typeName = (String) typeMap.get(KEY_NAME);
              Long size = null;
              Object len = typeMap.get(KEY_LEN);
              if (len instanceof Number) {
                size = ((Number) len).longValue();
              }
              SourceColumn.Builder colBuilder =
                  SourceColumn.builder(dbType)
                      .name((String) colMap.get(KEY_NAME))
                      .type(typeName)
                      .isNullable(
                          !(colMap.get(KEY_NOT_NULL) != null && (Boolean) colMap.get(KEY_NOT_NULL)))
                      .size(size);
              columnsBuilder.add(colBuilder.build());
            }
          }
          tableBuilder.columns(columnsBuilder.build());

          ImmutableList.Builder<String> pkCols = ImmutableList.builder();
          List<Map<String, Object>> pks =
              (List<Map<String, Object>>) tableMap.get(KEY_PRIMARY_KEYS);
          if (pks != null) {
            for (Map<String, Object> pk : pks) {
              String colId = (String) pk.get(KEY_COL_ID);
              String colName = ((Map<String, Object>) colDefs.get(colId)).get(KEY_NAME).toString();
              pkCols.add(colName);
            }
          }
          tableBuilder.primaryKeyColumns(pkCols.build());
          tablesBuilder.put((String) tableMap.get(KEY_NAME), tableBuilder.build());
        }
      }
      schemaBuilder.tables(tablesBuilder.build());
      return schemaBuilder.build();
    } catch (Exception e) {
      throw new RuntimeException("Error reading or parsing session file: " + sessionFile, e);
    }
  }

  public static Ddl buildSpannerShadowTableDdlFromSessionFile(String sessionFile) {
    try {
      ObjectMapper mapper = new ObjectMapper();
      Map<String, Object> session = mapper.readValue(new File(sessionFile), Map.class);
      Map<String, Object> spSchema = (Map<String, Object>) session.get(KEY_SP_SCHEMA);
      Ddl.Builder ddlBuilder = Ddl.builder();

      for (Map.Entry<String, Object> entry : spSchema.entrySet()) {
        Map<String, Object> tableMap = (Map<String, Object>) entry.getValue();
        String tableName = (String) tableMap.get(KEY_NAME);
        String shadowTableName = "shadow_" + tableName;
        Table.Builder tableBuilder = ddlBuilder.createTable(shadowTableName);
        Map<String, Object> colDefs = (Map<String, Object>) tableMap.get(KEY_COL_DEFS);

        List<String> pkColNames = new ArrayList<>();
        List<Map<String, Object>> pks = (List<Map<String, Object>>) tableMap.get(KEY_PRIMARY_KEYS);
        if (pks != null && !pks.isEmpty()) {
          for (Map<String, Object> pk : pks) {
            String colId = (String) pk.get(KEY_COL_ID);
            Map<String, Object> colMap = (Map<String, Object>) colDefs.get(colId);
            String colName = (String) colMap.get(KEY_NAME);
            pkColNames.add(colName);
          }
        }

        for (String colId : colDefs.keySet()) {
          Map<String, Object> colMap = (Map<String, Object>) colDefs.get(colId);
          String colName = (String) colMap.get(KEY_NAME);
          if (!pkColNames.contains(colName)) {
            continue;
          }
          Map<String, Object> typeMap = (Map<String, Object>) colMap.get(KEY_TYPE_SPANNER);
          String typeName = (String) typeMap.get(KEY_NAME);
          Boolean isArray = (Boolean) typeMap.get(KEY_IS_ARRAY);

          switch (typeName) {
            case SPANNER_TYPE_STRING:
              if (Boolean.TRUE.equals(isArray)) {
                tableBuilder.column(colName).array(Type.string()).endColumn();
              } else {
                tableBuilder.column(colName).string().max().endColumn();
              }
              break;
            case SPANNER_TYPE_INT64:
              tableBuilder.column(colName).int64().endColumn();
              break;
            case SPANNER_TYPE_FLOAT32:
              tableBuilder.column(colName).float32().endColumn();
              break;
            case SPANNER_TYPE_FLOAT64:
              tableBuilder.column(colName).float64().endColumn();
              break;
            case SPANNER_TYPE_BOOL:
              tableBuilder.column(colName).bool().endColumn();
              break;
            case SPANNER_TYPE_BYTES:
              tableBuilder.column(colName).bytes().max().endColumn();
              break;
            case SPANNER_TYPE_TIMESTAMP:
              tableBuilder.column(colName).timestamp().endColumn();
              break;
            case SPANNER_TYPE_DATE:
              tableBuilder.column(colName).date().endColumn();
              break;
            case SPANNER_TYPE_NUMERIC:
              tableBuilder.column(colName).numeric().endColumn();
              break;
            case SPANNER_TYPE_JSON:
              tableBuilder.column(colName).json().endColumn();
              break;
            default:
              throw new IllegalArgumentException(
                  "Unsupported Spanner type in session file: " + typeName);
          }
        }

        Column.Builder processedCommitTimestampColumnBuilder =
            tableBuilder.column(Constants.PROCESSED_COMMIT_TS_COLUMN_NAME);
        tableBuilder.addColumn(
            processedCommitTimestampColumnBuilder
                .type(Type.timestamp())
                .notNull(false)
                .autoBuild());

        Column.Builder recordSequenceColumnBuilder =
            tableBuilder.column(Constants.RECORD_SEQ_COLUMN_NAME);
        tableBuilder.addColumn(
            recordSequenceColumnBuilder.type(Type.int64()).notNull(false).autoBuild());

        if (!pkColNames.isEmpty()) {
          IndexColumn.IndexColumnsBuilder<Table.Builder> pkBuilder = tableBuilder.primaryKey();
          for (String pkColName : pkColNames) {
            pkBuilder.asc(pkColName);
          }
          pkBuilder.end();
        }
        tableBuilder.endTable();
      }
      return ddlBuilder.build();
    } catch (Exception e) {
      throw new RuntimeException("Error reading or parsing session file: " + sessionFile, e);
    }
  }
}
